异步子Agent任务概述#
异步子Agent任务是Claude Code中处理长时间运行任务的重要机制。通过异步执行,子Agent可以在后台执行任务,主Agent可以继续处理其他请求,从而提高整体效率和响应速度。
异步任务的基本概念#
1. 同步 vs 异步#
示例:同步 vs 异步
同步执行:
python````python def process_task(task): """同步处理任务""" result1 = agent1.execute(task) result2 = agent2.execute(result1) result3 = agent3.execute(result2) return result3 # 执行时间: 10 + 15 + 20 = 45秒 ```> 异步执行: ``` python async def process_task_async(task): """异步处理任务""" result1 = await agent1.execute_async(task) result2 = await agent2.execute_async(result1) result3 = await agent3.execute_async(result2) return result3 # 执行时间: 20秒(并行执行) ### 2. 异步任务的优势 | 特性 | 同步任务 | 异步任务 | |------|----------|----------| | 执行方式 | 顺序执行 | 并行执行 | | 响应时间 | 长 | 短 | | 资源利用率 | 低 | 高 | | 用户体验 | 差 | 好 | | 复杂度 | 低 | 中 | ## 异步子Agent实现 ### 1. 基础异步子Agent ````python ````python # 示例:基础异步子Agent 用户请求: "创建一个异步代码生成子Agent" Claude Code 生成的代码: ```python ```python import asyncio from typing import Dict, Any import logging logger = logging.getLogger(__name__) class AsyncCodeGenerationAgent: """异步代码生成子Agent""" def __init__(self, context): self.context = context self.task_queue = asyncio.Queue() self.results = {} self.running = False async def start(self): """启动Agent""" self.running = True logger.info("AsyncCodeGenerationAgent started") # 启动任务处理器 asyncio.create_task(self._process_tasks()) async def stop(self): """停止Agent""" self.running = False logger.info("AsyncCodeGenerationAgent stopped") async def submit_task(self, task_id: str, task: Dict[str, Any]): """提交任务""" await self.task_queue.put((task_id, task)) logger.info(f"Task {task_id} submitted") async def get_result(self, task_id: str) -> Dict[str, Any]: """获取任务结果""" while task_id not in self.results: await asyncio.sleep(0.1) return self.results[task_id] async def _process_tasks(self): """处理任务""" while self.running: try: task_id, task = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) logger.info(f"Processing task {task_id}") # 处理任务 result = await self._generate_code(task) # 保存结果 self.results[task_id] = result logger.info(f"Task {task_id} completed") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"Error processing task: {e}") async def _generate_code(self, task: Dict[str, Any]) -> Dict[str, Any]: """生成代码""" # 模拟长时间运行的任务 await asyncio.sleep(2) code = f""" # Generated code for {task['name']} def {task['function_name']}(): pass """ return { 'code': code, 'status': 'completed', 'task_id': task.get('task_id') } # 使用示例 async def main(): """主函数""" agent = AsyncCodeGenerationAgent({}) # 启动Agent await agent.start() # 提交任务 task1 = {'name': 'Task 1', 'function_name': 'func1'} task2 = {'name': 'Task 2', 'function_name': 'func2'} task3 = {'name': 'Task 3', 'function_name': 'func3'} await agent.submit_task('task1', task1) await agent.submit_task('task2', task2) await agent.submit_task('task3', task3) # 获取结果 result1 = await agent.get_result('task1') result2 = await agent.get_result('task2') result3 = await agent.get_result('task3') print("Task 1 result:", result1['status']) print("Task 2 result:", result2['status']) print("Task 3 result:", result3['status']) # 停止Agent await agent.stop() if __name__ == '__main__': asyncio.run(main()) ```> **异步子Agent特点**: > - 使用asyncio实现异步处理 > - 任务队列管理 > - 结果存储和检索 > - 后台任务处理 ### 2. 高级异步子Agent ````python `python # 示例:高级异步子Agent 用户请求: "创建一个支持任务优先级和超时的异步子Agent" Claude Code 生成的代码: ```python ```python import asyncio from typing import Dict, Any, Optional from datetime import datetime, timedelta import heapq import logging logger = logging.getLogger(__name__) class AsyncPriorityAgent: """支持优先级的异步子Agent""" def __init__(self, context, max_workers=4): self.context = context self.task_queue = [] self.task_lock = asyncio.Lock() self.results = {} self.running = False self.max_workers = max_workers self.workers = [] self.task_stats = { 'submitted': 0, 'completed': 0, 'failed': 0, 'timeout': 0 } async def start(self): """启动Agent""" self.running = True logger.info(f"AsyncPriorityAgent started with {self.max_workers} workers") # 启动工作线程 for i in range(self.max_workers): worker = asyncio.create_task(self._worker(f"worker-{i}")) self.workers.append(worker) async def stop(self): """停止Agent""" self.running = False # 等待所有工作线程完成 await asyncio.gather(*self.workers, return_exceptions=True) logger.info("AsyncPriorityAgent stopped") async def submit_task( self, task_id: str, task: Dict[str, Any], priority: int = 0, timeout: Optional[float] = None ): """提交任务""" task_data = { 'task_id': task_id, 'task': task, 'priority': priority, 'timeout': timeout, 'submitted_at': datetime.utcnow(), 'status': 'pending' } async with self.task_lock: heapq.heappush(self.task_queue, (-priority, task_data)) self.task_stats['submitted'] += 1 logger.info(f"Task {task_id} submitted with priority {priority}") async def get_result(self, task_id: str, timeout: float = 30.0) -> Dict[str, Any]: """获取任务结果""" start_time = datetime.utcnow() while True: if task_id in self.results: return self.results[task_id] # 检查超时 elapsed = (datetime.utcnow() - start_time).total_seconds() if elapsed > timeout: raise TimeoutError(f"Task {task_id} timeout after {timeout}s") await asyncio.sleep(0.1) async def _worker(self, worker_name: str): """工作线程""" logger.info(f"{worker_name} started") while self.running: try: # 获取任务 task_data = await self._get_task() if task_data is None: await asyncio.sleep(0.1) continue task_id = task_data['task_id'] task = task_data['task'] timeout = task_data.get('timeout') logger.info(f"{worker_name} processing task {task_id}") # 执行任务 try: if timeout: result = await asyncio.wait_for( self._execute_task(task), timeout=timeout ) else: result = await self._execute_task(task) self.results[task_id] = { 'result': result, 'status': 'completed', 'worker': worker_name, 'completed_at': datetime.utcnow() } self.task_stats['completed'] += 1 logger.info(f"{worker_name} completed task {task_id}") except asyncio.TimeoutError: self.results[task_id] = { 'error': 'Task timeout', 'status': 'timeout', 'worker': worker_name } self.task_stats['timeout'] += 1 logger.warning(f"{worker_name} task {task_id} timeout") except Exception as e: self.results[task_id] = { 'error': str(e), 'status': 'failed', 'worker': worker_name } self.task_stats['failed'] += 1 logger.error(f"{worker_name} task {task_id} failed: {e}") except Exception as e: logger.error(f"{worker_name} error: {e}") await asyncio.sleep(1) logger.info(f"{worker_name} stopped") async def _get_task(self) -> Optional[Dict[str, Any]]: """获取任务""" async with self.task_lock: if self.task_queue: _, task_data = heapq.heappop(self.task_queue) task_data['status'] = 'processing' return task_data return None async def _execute_task(self, task: Dict[str, Any]) -> Any: """执行任务""" task_type = task.get('type', 'default') if task_type == 'code_generation': return await self._generate_code(task) elif task_type == 'code_review': return await self._review_code(task) elif task_type == 'test_generation': return await self._generate_tests(task) else: return await self._default_task(task) async def _generate_code(self, task: Dict[str, Any]) -> str: """生成代码""" await asyncio.sleep(2) return f""" # Generated code for {task['name']} def {task['function_name']}(): pass """ async def _review_code(self, task: Dict[str, Any]) -> Dict[str, Any]: """审查代码""" await asyncio.sleep(1.5) return { 'issues': [], 'suggestions': [], 'metrics': {} } async def _generate_tests(self, task: Dict[str, Any]) -> str: """生成测试""" await asyncio.sleep(1) return """ import unittest class TestGeneratedCode(unittest.TestCase): pass """ async def _default_task(self, task: Dict[str, Any]) -> Any: """默认任务""" await asyncio.sleep(1) return {'result': 'completed'} def get_stats(self) -> Dict[str, Any]: """获取统计信息""" return { 'stats': self.task_stats.copy(), 'queue_size': len(self.task_queue), 'active_workers': len(self.workers) } # 使用示例 async def main(): """主函数""" agent = AsyncPriorityAgent({}, max_workers=4) # 启动Agent await agent.start() # 提交不同优先级的任务 await agent.submit_task('task1', {'name': 'Low Priority', 'type': 'code_generation'}, priority=1) await agent.submit_task('task2', {'name': 'High Priority', 'type': 'code_review'}, priority=10) await agent.submit_task('task3', {'name': 'Medium Priority', 'type': 'test_generation'}, priority=5) await agent.submit_task('task4', {'name': 'Timeout Task', 'type': 'code_generation'}, priority=8, timeout=1.0) # 获取结果 try: result1 = await agent.get_result('task1') print("Task 1 result:", result1['status']) except TimeoutError as e: print(f"Task 1 error: {e}") try: result2 = await agent.get_result('task2') print("Task 2 result:", result2['status']) except TimeoutError as e: print(f"Task 2 error: {e}") try: result3 = await agent.get_result('task3') print("Task 3 result:", result3['status']) except TimeoutError as e: print(f"Task 3 error: {e}") try: result4 = await agent.get_result('task4')
bashprint("Task 4 result:", result4['status']) except TimeoutError as e: print(f"Task 4 error: {e}") # 获取统计信息 stats = agent.get_stats() print("\nStatistics:") print(f" Submitted: {stats['stats']['submitted']}") print(f" Completed: {stats['stats']['completed']}") print(f" Failed: {stats['stats']['failed']}") print(f" Timeout: {stats['stats']['timeout']}") print(f" Queue size: {stats['queue_size']}") # 停止Agent await agent.stop()
if name == 'main': asyncio.run(main())
bash> - 支持任务优先级 > - 支持任务超时 > - 多工作线程 > - 任务统计 > - 错误处理 ## 异步任务模式 ### 1. 任务编排模式 ````python `python # 示例:任务编排 用户请求: "实现一个任务编排器,协调多个异步子Agent" Claude Code 生成的代码: ```python ```python import asyncio from typing import Dict, Any, List import logging logger = logging.getLogger(__name__) class TaskOrchestrator: """任务编排器""" def __init__(self, context): self.context = context self.agents = {} self.task_graph = {} self.results = {} def register_agent(self, name: str, agent): """注册Agent""" self.agents[name] = agent logger.info(f"Agent {name} registered") def define_workflow(self, workflow: Dict[str, Any]): """定义工作流""" self.task_graph = workflow logger.info("Workflow defined") async def execute_workflow(self) -> Dict[str, Any]: """执行工作流""" logger.info("Starting workflow execution") # 执行任务 for task in self.task_graph['tasks']: await self._execute_task(task) logger.info("Workflow execution completed") return self.results async def _execute_task(self, task: Dict[str, Any]): """执行任务""" task_id = task['id'] task_type = task['type'] agent_name = task['agent'] dependencies = task.get('dependencies', []) logger.info(f"Executing task {task_id}") # 等待依赖任务完成 for dep_id in dependencies: await self._wait_for_task(dep_id) # 获取Agent agent = self.agents.get(agent_name) if not agent: raise ValueError(f"Agent {agent_name} not found") # 执行任务 if task_type == 'code_generation': result = await agent._generate_code(task['params']) elif task_type == 'code_review': result = await agent._review_code(task['params']) elif task_type == 'test_generation': result = await agent._generate_tests(task['params']) else: result = await agent._default_task(task['params']) # 保存结果 self.results[task_id] = result logger.info(f"Task {task_id} completed") async def _wait_for_task(self, task_id: str): """等待任务完成""" while task_id not in self.results: await asyncio.sleep(0.1) # 使用示例 async def main(): """主函数""" orchestrator = TaskOrchestrator({}) # 注册Agent code_agent = AsyncCodeGenerationAgent({}) review_agent = AsyncCodeReviewAgent({}) test_agent = AsyncTestGenerationAgent({}) await code_agent.start() await review_agent.start() await test_agent.start() orchestrator.register_agent('code', code_agent) orchestrator.register_agent('review', review_agent) orchestrator.register_agent('test', test_agent) # 定义工作流 workflow = { 'tasks': [ { 'id': 'task1', 'type': 'code_generation', 'agent': 'code', 'params': {'name': 'User Service', 'function_name': 'create_user'} }, { 'id': 'task2', 'type': 'code_generation', 'agent': 'code', 'params': {'name': 'Product Service', 'function_name': 'create_product'} }, { 'id': 'task3', 'type': 'code_review', 'agent': 'review', 'params': {'code': 'result from task1'}, 'dependencies': ['task1'] }, { 'id': 'task4', 'type': 'code_review', 'agent': 'review', 'params': {'code': 'result from task2'}, 'dependencies': ['task2'] }, { 'id': 'task5', 'type': 'test_generation', 'agent': 'test', 'params': {'code': 'result from task1'}, 'dependencies': ['task1', 'task3'] }, { 'id': 'task6', 'type': 'test_generation', 'agent': 'test', 'params': {'code': 'result from task2'}, 'dependencies': ['task2', 'task4'] } ] } orchestrator.define_workflow(workflow) # 执行工作流 results = await orchestrator.execute_workflow() print("Workflow results:") for task_id, result in results.items(): print(f" {task_id}: {result.get('status', 'unknown')}") if __name__ == '__main__': asyncio.run(main()) ```> **任务编排特点**: > - 定义任务依赖关系 > - 自动处理任务执行顺序 > - 支持并行执行 > - 结果收集和传递 ### 2. 任务分发模式 ````python `python # 示例:任务分发 用户请求: "实现一个任务分发器,将任务分发给多个异步子Agent" Claude Code 生成的代码: ```python ```python import asyncio from typing import Dict, Any, List import random import logging logger = logging.getLogger(__name__) class TaskDispatcher: """任务分发器""" def __init__(self, context): self.context = context self.agents = [] self.task_queue = asyncio.Queue() self.running = False def register_agent(self, agent): """注册Agent""" self.agents.append(agent) logger.info(f"Agent registered, total: {len(self.agents)}") async def start(self): """启动分发器""" self.running = True logger.info("TaskDispatcher started") # 启动分发线程 asyncio.create_task(self._dispatch_tasks()) async def stop(self): """停止分发器""" self.running = False logger.info("TaskDispatcher stopped") async def submit_task(self, task: Dict[str, Any]): """提交任务""" await self.task_queue.put(task) logger.info(f"Task submitted: {task.get('id', 'unknown')}") async def _dispatch_tasks(self): """分发任务""" while self.running: try: # 获取任务 task = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) # 选择Agent agent = self._select_agent(task) if agent: # 提交任务给Agent await agent.submit_task(task['id'], task) logger.info(f"Task {task['id']} dispatched to {agent.__class__.__name__}") else: logger.warning(f"No available agent for task {task['id']}") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"Error dispatching task: {e}") def _select_agent(self, task: Dict[str, Any]) -> Any: """选择Agent""" if not self.agents: return None # 根据任务类型选择Agent task_type = task.get('type', 'default') for agent in self.agents: if hasattr(agent, 'can_handle') and agent.can_handle(task_type): return agent # 随机选择一个Agent return random.choice(self.agents) class LoadBalancedDispatcher(TaskDispatcher): """负载均衡分发器""" def __init__(self, context): super().__init__(context) self.agent_loads = {} def register_agent(self, agent): """注册Agent""" super().register_agent(agent) self.agent_loads[agent] = 0 async def _dispatch_tasks(self): """分发任务(负载均衡)""" while self.running: try: # 获取任务 task = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) # 选择负载最低的Agent agent = self._select_least_loaded_agent(task) if agent: # 提交任务给Agent await agent.submit_task(task['id'], task) self.agent_loads[agent] += 1 logger.info(f"Task {task['id']} dispatched to {agent.__class__.__name__} (load: {self.agent_loads[agent]})") else: logger.warning(f"No available agent for task {task['id']}") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"Error dispatching task: {e}") def _select_least_loaded_agent(self, task: Dict[str, Any]) -> Any: """选择负载最低的Agent""" if not self.agents: return None # 找到负载最低的Agent min_load = float('inf') selected_agent = None for agent in self.agents: if self.agent_loads[agent] < min_load: min_load = self.agent_loads[agent] selected_agent = agent return selected_agent def update_agent_load(self, agent, delta: int): """更新Agent负载""" if agent in self.agent_loads: self.agent_loads[agent] += delta logger.info(f"Agent {agent.__class__.__name__} load updated to {self.agent_loads[agent]}") # 使用示例 async def main(): """主函数""" dispatcher = LoadBalancedDispatcher({}) # 注册Agent agent1 = AsyncCodeGenerationAgent({}) agent2 = AsyncCodeGenerationAgent({}) agent3 = AsyncCodeGenerationAgent({}) await agent1.start() await agent2.start() await agent3.start() dispatcher.register_agent(agent1) dispatcher.register_agent(agent2) dispatcher.register_agent(agent3) # 启动分发器 await dispatcher.start() # 提交多个任务 for i in range(10): await dispatcher.submit_task({ 'id': f'task{i}', 'type': 'code_generation', 'name': f'Task {i}', 'function_name': f'func{i}' }) # 等待任务完成 await asyncio.sleep(5) # 停止分发器 await dispatcher.stop() # 停止Agent await agent1.stop() await agent2.stop() await agent3.stop() if __name__ == '__main__': asyncio.run(main()) ```> **任务分发特点**: > - 负载均衡 > - 动态Agent选择 > - 负载监控 > - 任务路由 ## 异步任务监控 ### 1. 任务状态监控 ````python `python # 示例:任务状态监控 用户请求: "实现一个任务状态监控系统" Claude Code 生成的代码: ```python ```python import asyncio from typing import Dict, Any from datetime import datetime import logging logger = logging.getLogger(__name__) class TaskMonitor: """任务监控器""" def __init__(self): self.tasks = {} self.events = [] def register_task(self, task_id: str, task_info: Dict[str, Any]): """注册任务""" self.tasks[task_id] = { 'id': task_id, 'status': 'pending', 'created_at': datetime.utcnow(), 'updated_at': datetime.utcnow(), **task_info } self._log_event(task_id, 'registered') logger.info(f"Task {task_id} registered") def update_task_status(self, task_id: str, status: str, **kwargs): """更新任务状态""" if task_id not in self.tasks: logger.warning(f"Task {task_id} not found") return self.tasks[task_id]['status'] = status self.tasks[task_id]['updated_at'] = datetime.utcnow() self.tasks[task_id].update(kwargs) self._log_event(task_id, f'status_changed_to_{status}') logger.info(f"Task {task_id} status updated to {status}") def get_task_status(self, task_id: str) -> Dict[str, Any]: """获取任务状态""" return self.tasks.get(task_id, {}) def get_all_tasks(self) -> Dict[str, Any]: """获取所有任务""" return self.tasks def get_tasks_by_status(self, status: str) -> Dict[str, Any]: """根据状态获取任务""" return { task_id: task for task_id, task in self.tasks.items() if task['status'] == status } def get_task_statistics(self) -> Dict[str, Any]: """获取任务统计""" stats = { 'total': len(self.tasks), 'pending': 0, 'processing': 0, 'completed': 0, 'failed': 0, 'timeout': 0 } for task in self.tasks.values(): status = task['status'] if status in stats: stats[status] += 1 return stats def _log_event(self, task_id: str, event: str): """记录事件""" self.events.append({ 'task_id': task_id, 'event': event, 'timestamp': datetime.utcnow() }) def get_task_events(self, task_id: str) -> list: """获取任务事件""" return [ event for event in self.events if event['task_id'] == task_id ] # 使用示例 async def main(): """主函数""" monitor = TaskMonitor() # 注册任务 monitor.register_task('task1', {'name': 'Task 1', 'type': 'code_generation'}) monitor.register_task('task2', {'name': 'Task 2', 'type': 'code_review'}) monitor.register_task('task3', {'name': 'Task 3', 'type': 'test_generation'}) # 更新任务状态 monitor.update_task_status('task1', 'processing') await asyncio.sleep(1) monitor.update_task_status('task1', 'completed', result='success') monitor.update_task_status('task2', 'processing') await asyncio.sleep(1) monitor.update_task_status('task2', 'failed', error='validation error') monitor.update_task_status('task3', 'processing') await asyncio.sleep(1) monitor.update_task_status('task3', 'timeout') # 获取任务状态 print("Task 1 status:", monitor.get_task_status('task1')) print("Task 2 status:", monitor.get_task_status('task2')) print("Task 3 status:", monitor.get_task_status('task3')) # 获取任务统计 stats = monitor.get_task_statistics() print("\nTask Statistics:") for key, value in stats.items(): print(f" {key}: {value}") # 获取任务事件 print("\nTask 1 events:") for event in monitor.get_task_events('task1'): print(f" {event['timestamp']}: {event['event']}") if __name__ == '__main__': asyncio.run(main())
任务监控特点:
- 实时状态跟踪
- 任务统计
- 事件记录
- 状态查询